package com.atguigu.app.dws;

import com.alibaba.fastjson.JSONObject;
import com.atguigu.Func.MyWindowFunction;
import com.atguigu.Util.ClickHouseUtil;
import com.atguigu.Util.DateFormatUtil;
import com.atguigu.Util.MyKafkaUtil;
import com.atguigu.bean.UserRegisterBean;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.TimestampAssignerSupplier;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.time.Duration;

/**
 * @author hjy
 * @create 2023/3/15 20:17
 */
//todo 没跑通，关不了窗没法触发计算 watermark有问题
//数据流:app/客户端->mysql->maxwell->kafka(topic_db  ods)->BaseDBApp->kafka(dwd)->flinkApp->clickHouse
//程 序:mock->mysql->maxwell->kafka(zk)->BaseDBApp->kafka(zk)->Dws05_UserUserRegisterWindow->clickHous
public class Dws05_UserUserRegisterWindow {
    public static void main(String[] args) throws Exception {
        //todo 1 获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
//        env.enableCheckpointing(5000L, CheckpointingMode.EXACTLY_ONCE);
//        env.getCheckpointConfig().setCheckpointStorage("hdfs://hadoop102:8020/gmall-flink/check");
//        env.getCheckpointConfig().setCheckpointTimeout(60000L);
//        env.setStateBackend(new HashMapStateBackend());
//        System.setProperty("HADOOP_USER_NAME","atguigu");
        //todo 2 从kafka（dwd_user_register）读取数据
        String topic = "dwd_user_register";
        String groupId = "dws_user_register_window";
        DataStreamSource<String> kafkaDS = env.addSource(MyKafkaUtil.getFlinkKafkaConsumer(topic, groupId));
        //todo 3 转化为JSONObject对象 并加上时间戳
        SingleOutputStreamOperator<JSONObject> jsonObjWithWMDS = kafkaDS.map(new MapFunction<String, JSONObject>() {
            @Override
            public JSONObject map(String value) throws Exception {
                return JSONObject.parseObject(value);
            }
        }).assignTimestampsAndWatermarks(WatermarkStrategy
                .<JSONObject>forBoundedOutOfOrderness(Duration.ofSeconds(2))
                .withTimestampAssigner(TimestampAssignerSupplier.of(new SerializableTimestampAssigner<JSONObject>() {
                    @Override
                    public long extractTimestamp(JSONObject element, long recordTimestamp) {
                        return DateFormatUtil.toTs(element.getString("create_time"), true);
                    }
                })));
        //todo 4 转为JAVAbean对象
        SingleOutputStreamOperator<UserRegisterBean> javaBeanDS = jsonObjWithWMDS.map(new MapFunction<JSONObject, UserRegisterBean>() {
            @Override
            public UserRegisterBean map(JSONObject value) throws Exception {
                return new UserRegisterBean("",
                        "",
                        1L,
                        null);
            }
        });
        //todo 5 开窗聚合
        SingleOutputStreamOperator<UserRegisterBean> resultDS = javaBeanDS.windowAll(TumblingEventTimeWindows
                .of(Time.seconds(10)))

                .reduce(new ReduceFunction<UserRegisterBean>() {
                    @Override
                    public UserRegisterBean reduce(UserRegisterBean value1, UserRegisterBean value2) throws Exception {
                        value1.setRegisterCt(value1.getRegisterCt() + value2.getRegisterCt());
                        return value1;
                    }
                }, new AllWindowFunction<UserRegisterBean, UserRegisterBean, TimeWindow>() {
                    @Override
                    public void apply(TimeWindow window, Iterable<UserRegisterBean> values, Collector<UserRegisterBean> out) throws Exception {
                        UserRegisterBean next = values.iterator().next();
                        out.collect(MyWindowFunction.getJavabeanFields(next, window));
                    }
                });
        //todo 6 写出数据到clickHouse
        resultDS.print("resultDS>>>>>>>>>>>>>>>>>>");
        resultDS.addSink(ClickHouseUtil.getSinkFunction("insert into dws_user_user_register_window values(?,?,?,?)"));
        //todo 7 启动程序
        env.execute();
    }
}
